package com.yh.blink.backend;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.TimeCharacteristic;

import java.io.IOException;

public class CheckPointTest {
    public static void main(String[] args) throws IOException {
        StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.enableCheckpointing(1000L);
        // the overwrite enableCheckpointing(interval) method is deprecated
        // we will use enableCheckpointing(interval,checkpoint mode)
        env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
        CheckpointConfig checkpointConfig = env.getCheckpointConfig();
        checkpointConfig.setCheckpointInterval(1000L);
        checkpointConfig.setCheckpointTimeout(1000L);
        // if we want to retain checkpoint after canceled job , blow setting are advised
        checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // checkpointing minimal interval time ，
        checkpointConfig.setMinPauseBetweenCheckpoints(1000L);
        // false advised.
        checkpointConfig.setFailOnCheckpointingErrors(false);
        //        StateBackend backend=new FsStateBackend(
//                "hdfs://namenode:40010/flink/checkpoints",
//                false);

//        StateBackend backend=new MemoryStateBackend(10*1024*1024,false);

        StateBackend backend=new RocksDBStateBackend(
                "hdfs://namenode:40010/flink/checkpoints",
                true);

        env.setStateBackend(backend);
    }
}
